package com.data.mall.etl.service.impl;

import com.data.mall.common.KafkaTrackingDTO;
import com.data.mall.etl.domain.enums.Params;
import com.data.mall.etl.service.UserEventETL;
import com.data.mall.service.dto.OrderDTO;
import com.data.mall.service.dto.UserDTO;
import com.data.mall.utils.CKUtils;
import com.data.mall.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.sql.SQLException;
import java.util.Map;

/**
 * @author shixukai
 * @Package com.data.mall.etl.impl
 * @Description: TODO
 * @date 2021/4/29
 */
@Slf4j
@Service
public class UserEventETLImpl implements UserEventETL {

    @Autowired
    private CKUtils ckUtils;

    @Override
    public Object userViewEvent(KafkaTrackingDTO dto) {
        // 商详浏览行为
        insertMemberViewBehavior(dto);
        return dto;
    }

    @Override
    public Object userOrderEvent(KafkaTrackingDTO dto) {
        // 下单行为
        insertMemberOrderBehavior(dto);
        return dto;
    }

    @Override
    public Object userLoginEvent(KafkaTrackingDTO dto) {
        //用户登录
        insertMemberLogin(dto);
        return dto;
    }

    @Override
    public Object userReg(KafkaTrackingDTO dto) {
        // 用户信息
        insertMemberInfo(dto);
        return dto;
    }

    private void insertMemberLogin(KafkaTrackingDTO dto){
        Map<Params, String> param = dto.getParamList();
        String event = dto.getEvent().name();
        Long userId = Long.valueOf(param.get(Params.USER_ID));
        String time = param.get(Params.TIME);
        String sql = "insert into datamall.user_login(user_id,event,time) VALUES" +
                "(" +
                userId + "," + ",'" + event + "','" + time + "'" +
                ")";
        try {
            ckUtils.insert(sql);
        } catch (SQLException e) {
            log.error("insertMemberBehavior error");
        }
    }

    private void insertMemberViewBehavior(KafkaTrackingDTO dto) {
        Map<Params, String> param = dto.getParamList();
        String event = dto.getEvent().name();
        Long userId = Long.valueOf(param.get(Params.USER_ID));
        Long itemId = Long.valueOf(param.get(Params.ITEM_ID));
        String time = param.get(Params.TIME);
        long promotionId = 0;
        if(param.containsKey(Params.PROMOTION_ID)){
            promotionId = Long.valueOf(param.get(Params.PROMOTION_ID));
        }
        String sql = "insert into datamall.user_view(user_id,item_id,event,time,promotion_id) VALUES" +
                "(" +
                userId + "," + itemId + "," + ",'" + event + "','" + time + "','" + promotionId + "'"+
                ")";
        try {
            ckUtils.insert(sql);
        } catch (SQLException e) {
            log.error("insertMemberBehavior error");
        }
    }

    private void insertMemberOrderBehavior(KafkaTrackingDTO dto) {
        Map<Params, String> param = dto.getParamList();
        String event = dto.getEvent().name();
        OrderDTO order = JSON.parseObject(param.get(Params.ORDER), OrderDTO.class);
        String time = param.get(Params.TIME);
        String sql = "insert into datamall.user_order(user_id,item_id,event,time,order_no,item_price,amount,order_price,promotion_id) VALUES" +
                "(" +
                order.getUserId() + "," + order.getItemId() + ",'" + event + "','" + time + "','" +
                order.getOrderNo() + "','" + order.getItemPrice() + "'," + order.getAmount() + ",'" +
                order.getOrderPrice() + "'," + order.getPromoId() +
                ")";
        try {
            ckUtils.insert(sql);
        } catch (SQLException e) {
            log.error("insertMemberBehavior error");
        }
    }

    private void insertMemberInfo(KafkaTrackingDTO dto) {
        Map<Params, String> param = dto.getParamList();
        UserDTO user = JSON.parseObject(param.get(Params.USER), UserDTO.class);
        String sql = "insert into datamall.dim_user(user_id,name,gender,age,mobile,register_mode,third_party_id) VALUES" +
                "(" +
                user.getId() + ",'" + user.getName() + "'," + user.getGender() + "," + user.getAge() + ",'" + user.getRegisterMode() + "','" + user.getThirdPartyId() + "'" +
                ")";
        try {
            ckUtils.insert(sql);
        } catch (SQLException e) {
            log.error("insertMemberInfo error");
        }
    }
}
